public interface Directory extends Node { /** * get service type. * * @return service type. */ Class getInterface(); /** * list invokers. * * @return invokers */ List list(Invocation invocation) throws RpcException; }



@Override public List doList(Invocation invocation) { if (forbidden) { // 1. No service provider 2. Service providers are disabled throw new RpcException("xxxxx"); } List invokers = null; Map localMethodInvokerMap = this.methodInvokerMap; // local reference if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) { // invocation 应该是RpcInvocation,封装方法名、方法参数 String methodName = RpcUtils.getMethodName(invocation); Object[] args = RpcUtils.getArguments(invocation); // 下面一大段就是表达,通过方法名,获取到对应的invoker列表 if (args != null && args.length > 0 && args[0] != null && (args[0] instanceof String || args[0].getClass().isEnum())) { // The routing can be enumerated according to the first parameter // 根据方法名和第一个参数 invokers = localMethodInvokerMap.get(methodName + "." + args[0]); } if (invokers == null) { invokers = localMethodInvokerMap.get(methodName); } if (invokers == null) { invokers = localMethodInvokerMap.get(Constants.ANY_VALUE); } if (invokers == null) { Iterator iterator = localMethodInvokerMap.values().iterator(); if (iterator.hasNext()) { invokers = iterator.next(); } } } return invokers == null ? new ArrayList(0) : invokers; } 监听注册中心




@Override public synchronized void notify(List urls) { // 用来保存configurators、routers、providers节点的url信息 List invokerUrls = new ArrayList(); List routerUrls = new ArrayList(); List configuratorUrls = new ArrayList(); for (URL url : urls) { // 根据协议和类别,把url添加到对应的集合 String protocol = url.getProtocol(); String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) { routerUrls.add(url); } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) { configuratorUrls.add(url); } else if (Constants.PROVIDERS_CATEGORY.equals(category)) { invokerUrls.add(url); } else { logger.warn("xxxx"); } } // configurators if (configuratorUrls != null && !configuratorUrls.isEmpty()) { // 把URL转成Configurator对象 this.configurators = toConfigurators(configuratorUrls); } // routers if (routerUrls != null && !routerUrls.isEmpty()) { // 把URL转成Router对象 List routers = toRouters(routerUrls); if (routers != null) { // null - do nothing setRouters(routers); } } List localConfigurators = this.configurators; // local reference // merge override parameters this.overrideDirectoryUrl = directoryUrl; if (localConfigurators != null && !localConfigurators.isEmpty()) { for (Configurator configurator : localConfigurators) { this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); } } // providers // 刷新provider refreshInvoker(invokerUrls); } 刷新invoker列表



private void refreshInvoker(List invokerUrls) { if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // Forbid to access this.methodInvokerMap = null; // Set the method invoker map to null destroyAllInvokers(); // Close all invokers } else { this.forbidden = false; // Allow to access Map oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet(); //Cached invoker urls, convenient for comparison this.cachedInvokerUrls.addAll(invokerUrls); } if (invokerUrls.isEmpty()) { return; } // Translate url list to Invoker map // 把URL对象转成Invoker Map newUrlInvokerMap = toInvokers(invokerUrls); // Change method name to map Invoker Map // 根据方法名分类 Map newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // state change // If the calculation is wrong, it is not processed. if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { logger.error("xxxxxxx"); return; } this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } }

刷新列表的过程是,先把集合里面的URL对象转成Invoker,得到一个**的Map。接着,根据方法名归类,得到一个**的映射关系。最后,将同一个组的Invoker合并,得到methodInvokerMap,也就是doList方法中通过方法名返回对应invoker List的缓存。



最常用的是条件路由,格式是 消费者匹配条件 => 提供者地址列表过滤条件,比如host = => host = 表示ip为10.20.153.10的消费者只能调用ip是10.20.153.11的提供者。










@SPI(FailoverCluster.NAME) public interface Cluster { /** * Merge the directory invokers to a virtual invoker. * * @param * @param directory * @return cluster invoker * @throws RpcException */ @Adaptive Invoker join(Directory directory) throws RpcException; } FailoverClusterInvoker



@Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List invokers, LoadBalance loadbalance) throws RpcException { List copyinvokers = invokers; checkInvokers(copyinvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); // 获取重试次数,Constants.DEFAULT_RETRIES=2,所以默认应该是3次 int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); // check again checkInvokers(copyinvokers, invocation); } // 通过负载均衡选择一个invoker Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { // 发起调用 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { // ... 打印日志 } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException("...异常信息"); } FailfastClusterInvoker


@Override public Result doInvoke(Invocation invocation, List invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); Invoker invoker = select(loadbalance, invocation, invokers, null); try { return invoker.invoke(invocation); } catch (Throwable e) { if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception. throw (RpcException) e; } throw new RpcException("xxxx"); } } FailbackClusterInvoker


@Override protected Result doInvoke(Invocation invocation, List invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); Invoker invoker = select(loadbalance, invocation, invokers, null); return invoker.invoke(invocation); } catch (Throwable e) { logger.error("xxxx"); // 记录失败的调用 addFailed(invocation, this); return new RpcResult(); // ignore } } private void addFailed(Invocation invocation, AbstractClusterInvoker router) { if (retryFuture == null) { synchronized (this) { if (retryFuture == null) { // 开启定时任务,默认每隔5s执行一次 retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { // collect retry statistics try { // 失败重试 retryFailed(); } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected error occur at collect statistic", t); } } }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); } } } // 把调用失败的添加到Map failed.put(invocation, router); } void retryFailed() { if (failed.size() == 0) { return; } for (Map.Entry>(failed).entrySet()) { Invocation invocation = entry.getKey(); Invoker invoker = entry.getValue(); try { invoker.invoke(invocation); // 调用成功后移除 failed.remove(invocation); } catch (Throwable e) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); } } } 负载均衡





@Override public Invoker select(List invokers, URL url, Invocation invocation) { if (invokers == null || invokers.isEmpty()) return null; if (invokers.size() == 1) return invokers.get(0); // 由子类实现 return doSelect(invokers, url, invocation); } protected abstract Invoker doSelect(List invokers, URL url, Invocation invocation);


/** * 计算权重 * * @param invoker * @param invocation * @return */ protected int getWeight(Invoker invoker, Invocation invocation) { // 获取weight参数值 int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); if (weight > 0) { // 得到启动时间 long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L); if (timestamp > 0L) { // 计算启动多久了 int uptime = (int) (System.currentTimeMillis() - timestamp); // 获取warmup参数值 int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP); if (uptime > 0 && uptime < warmup) { // 启动时间小于预热时间,则降低权重 weight = calculateWarmupWeight(uptime, warmup, weight); } } } return weight; } static int calculateWarmupWeight(int uptime, int warmup, int weight) { int ww = (int) ((float) uptime / ((float) warmup / (float) weight)); return ww < 1 ? 1 : (ww > weight ? weight : ww); } RandomLoadBalance



@Override protected Invoker doSelect(List invokers, URL url, Invocation invocation) { int length = invokers.size(); // Number of invokers int totalWeight = 0; // The sum of weights boolean sameWeight = true; // Every invoker has the same weight? // 判断是否等权重 for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); totalWeight += weight; // Sum if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; } } if (totalWeight > 0 && !sameWeight) { // If (not every invoker has the same weight & at least one invoker's weight>0), // select randomly based on totalWeight. // 生成一个随机数 int offset = random.nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < length; i++) { // 随机数与权重相减 offset -= getWeight(invokers.get(i), invocation); // 随机数小于权重,说明落到指定区间 if (offset < 0) { return invokers.get(i); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(random.nextInt(length)); } LeastActiveLoadBalance


@Override protected Invoker doSelect(List invokers, URL url, Invocation invocation) { // Number of invokers int length = invokers.size(); // The least active value of all invokers // 最少活跃数 int leastActive = -1; // The number of invokers having the same least active value (leastActive) // 有多少个活跃数等于最小活跃数的提供者数量 int leastCount = 0; // The index of invokers having the same least active value (leastActive) int[] leastIndexs = new int[length]; // The sum of with warmup weights int totalWeight = 0; // Initial value, used for comparision int firstWeight = 0; // Every invoker has the same weight value? boolean sameWeight = true; // 循环遍历每个提供者 for (int i = 0; i < length; i++) { Invoker invoker = invokers.get(i); // Active number, 获取当前提供者活跃数 int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); int afterWarmup = getWeight(invoker, invocation); // Weight // Restart, when find a invoker having smaller least active value. if (leastActive == -1 || active < leastActive) { leastActive = active; // Record the current least active value leastCount = 1; // Reset leastCount, count again based on current leastCount leastIndexs[0] = i; // Reset totalWeight = afterWarmup; // Reset firstWeight = afterWarmup; // Record the weight the first invoker sameWeight = true; // Reset, every invoker has the same weight value? // If current invoker's active value equals with leaseActive, then accumulating. } else if (active == leastActive) { leastIndexs[leastCount++] = i; // Record index number of this invoker totalWeight += afterWarmup; // Add this invoker's weight to totalWeight. // If every invoker has the same weight? if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } // assert(leastCount > 0) if (leastCount == 1) { // If we got exactly one invoker having the least active value, return this invoker directly. return invokers.get(leastIndexs[0]); } // 如果多个提供者的活跃数等于最小活跃数,通过加权随机算法选择其中一个 if (!sameWeight && totalWeight > 0) { // If (not every invoker has the same weight & at least one invoker's weight>0), // select randomly based on totalWeight. int offsetWeight = random.nextInt(totalWeight) + 1; // Return a invoker based on the random value. for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexs[i]; offsetWeight -= getWeight(invokers.get(leastIndex), invocation); if (offsetWeight 接口名.方法名 private final ConcurrentMap>(); @SuppressWarnings("unchecked") @Override protected Invoker doSelect(List invokers, URL url, Invocation invocation) { String methodName = RpcUtils.getMethodName(invocation); // key -> 接口名.方法名 String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; // 用于验证对象变化 int identityHashCode = System.identityHashCode(invokers); ConsistentHashSelector selector = (ConsistentHashSelector) selectors.get(key); if (selector == null || selector.identityHashCode != identityHashCode) { // 创建选择器,并缓存 selectors.put(key, new ConsistentHashSelector(invokers, methodName, identityHashCode)); selector = (ConsistentHashSelector) selectors.get(key); } // 调用选择器的select方法 return selector.select(invocation); } private static final class ConsistentHashSelector { // hash环中,保存 值与invoker的映射关系 private final TreeMap virtualInvokers; // 每个invoker的虚拟节点数 private final int replicaNumber; private final int identityHashCode; private final int[] argumentIndex; ConsistentHashSelector(List invokers, String methodName, int identityHashCode) { // 基于红黑树实现的有序map this.virtualInvokers = new TreeMap(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); // 获取虚拟节点数,默认是160 this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); // 获取需要hash的参数下标,默认获取第1个参数 // 配置例子 String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); // 暂时发现argumentIndex的作用是toKey方法中用到,即根据参数决定在圆环上的落点 argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } // 设置虚拟节点与invoker的映射关系 for (Invoker invoker : invokers) { // 获取提供者的ip:port String address = invoker.getUrl().getAddress(); // 以160个虚拟节点为例,生成40份加密,每份经过4次hash,即还是每个invoker有160个虚拟节点 for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(address + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } } public Invoker select(Invocation invocation) { // 根据参数生成key String key = toKey(invocation.getArguments()); byte[] digest = md5(key); // md5加密和hash key,得到对应invoker return selectForKey(hash(digest, 0)); } private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); } private Invoker selectForKey(long hash) { // ailMap(hash, true):获取大于hash值的所有key,true表示包括等于。 // .firstEntry() -> 获取第一个 Map.Entry entry = virtualInvokers.tailMap(hash, true).firstEntry(); if (entry == null) { // 如果为null,说明取第一个 entry = virtualInvokers.firstEntry(); } return entry.getValue(); } private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF)




